Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Dec 28, 2020

Many people will export the result of flink aggregate values into apache iceberg table, for example:

SELECT count(click_num)  FROM click_events GROUP BY DATE(click_timestamp) ; 

This stream query will count the click number since the beginning of today (00:00:00), every emitted events will be a UPSERT events which overwrite the previous accumulated click_num.

In this cases, we will need to transform all INSERT/UPDATE_AFTER to be UPSERT, which means DELETE + INSERT the key.

@github-actions github-actions bot added the flink label Dec 28, 2020
* All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will
* DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be
* a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
* new row that located in partition-B.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anything validate this constraint?

switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this should only happen for the INSERT case because UPDATE_AFTER implies that there was an UPDATE_BEFORE that will perform the delete. This would delete the same row twice in that case, causing more equality deletes to be written for the row.

@rdblue
Copy link
Contributor

rdblue commented Dec 28, 2020

Mostly looks good, but I don't think that upsert should be supported for UPDATE_AFTER. Interested to hear your rationale for that case.

coolderli pushed a commit to coolderli/iceberg that referenced this pull request Apr 26, 2021
@himanshpal
Copy link

@rdblue @openinx - Is there any update on this. Currently we are seeing duplicate rows while writing/compacting cdc events to table ?

@wg1026688210
Copy link
Contributor

wg1026688210 commented Jul 2, 2021

Whether the pr can be merge . In one of our scenarios, the binlog of tidb has no before_update data before after_update. We hope flink can help us to do it @rdblue @openinx

@haormj
Copy link
Contributor

haormj commented Jul 16, 2021

@openinx @rdblue Is there any update on this?

@rdblue
Copy link
Contributor

rdblue commented Jul 19, 2021

I think this is just waiting on someone to pick it up again. UPSERT should be unblocked now that row identifier fields have been added.

@Reo-LEI
Copy link
Contributor

Reo-LEI commented Jul 25, 2021

I'm pick this up on #2863 @rdblue

@openinx
Copy link
Member Author

openinx commented Jul 27, 2021

Since @Reo-LEI picked this PR, I will close this PR now. And let's review that PR here.

@openinx openinx closed this Jul 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants